package day05;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

/**
 * Flink Table API 与 SQL —— 输出到 kafka
 *
 * @author lvbingbing
 * @date 2022-01-18 20:56
 */
public class FlinkTableApi03 {
    public static void main(String[] args) throws Exception {
        // 1、获取可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism = 1;
        env.setParallelism(parallelism);
        // 2、从kafka 读取数据，并将数据写入到 kafka 中去
        studyWriteToKafka(env);
        // 3、触发程序执行
        env.execute();

    }

    /**
     * 从kafka 读取数据，并将数据写入到 kafka 中去
     *
     * @param env <br>
     */
    private static void studyWriteToKafka(StreamExecutionEnvironment env) {
        // 1、创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 2、连接kafka，读取数据
        tableEnv.connect(new Kafka()
                        .version("0.11")
                        .topic("sensor")
                        .property("zookeeper.connect", "hadoop102:2181")
                        .property("bootstrap.servers", "hadoop102:9092")
                )
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                ).createTemporaryTable("inputTable");
        // 3、查询转换
        Table sensorTable = tableEnv.from("inputTable");
        Table resultTable = sensorTable.select("id, temp")
                .filter("id === 'sensor_6'");
        // 聚合统计
        Table aggTable = sensorTable.groupBy("id")
                .select("id, id.count as count, temp.avg as avgTemp");
        // 4、建立kafka 连接，输出到不同的 topic 下
        tableEnv.connect(new Kafka()
                        .version("0.11")
                        .topic("sinktest")
                        .property("zookeeper.connect", "hadoop102:2181")
                        .property("bootstrap.servers", "hadoop102:9092")
                )
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        //.field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("outputTable");
        resultTable.insertInto("outputTable");
    }
}